bitkeeper revision 1.1686.1.1 (42a5bc8fhkR_9WfuD9N-je5TT27yDw)
authorakw27@arcadians.cl.cam.ac.uk <akw27@arcadians.cl.cam.ac.uk>
Tue, 7 Jun 2005 15:26:07 +0000 (15:26 +0000)
committerakw27@arcadians.cl.cam.ac.uk <akw27@arcadians.cl.cam.ac.uk>
Tue, 7 Jun 2005 15:26:07 +0000 (15:26 +0000)
Parallax code cleanups.

Signed-off-by: andrew.warfield@cl.cam.ac.uk
tools/blktap/block-async.c
tools/blktap/block-async.h
tools/blktap/parallax.c
tools/blktap/requests-async.c

index 6f26071adeae603cb73566a55d0e501af4a4aa89..a0460de6fc8dacc623c531a148ce8dfe5fdac372 100755 (executable)
  */\r
 \r
 struct read_args {\r
-       u64 addr;\r
+    u64 addr;
 };\r
 \r
 struct write_args {\r
-       u64   addr;\r
-       char *block;\r
+    u64   addr;
+    char *block;
 };\r
 \r
 struct alloc_args {\r
-       char *block;\r
+    char *block;
 };\r
  \r
 struct pending_io_req {\r
-       enum {IO_READ, IO_WRITE, IO_ALLOC, IO_RWAKE, IO_WWAKE} op;\r
-       union {\r
-               struct read_args  r;\r
-               struct write_args w;\r
-               struct alloc_args a;\r
-       } u;\r
-       io_cb_t cb;\r
-       void *param;\r
+    enum {IO_READ, IO_WRITE, IO_ALLOC, IO_RWAKE, IO_WWAKE} op;
+    union {
+        struct read_args  r;
+        struct write_args w;
+        struct alloc_args a;
+    } u;
+    io_cb_t cb;
+    void *param;
 };\r
 \r
 void radix_lock_init(struct radix_lock *r)\r
 {\r
-       int i;\r
-       \r
-       pthread_mutex_init(&r->lock, NULL);\r
-       for (i=0; i < 1024; i++) {\r
-               r->lines[i] = 0;\r
-               r->waiters[i] = NULL;\r
-               r->state[i] = ANY;\r
-       }\r
+    int i;
+    
+    pthread_mutex_init(&r->lock, NULL);
+    for (i=0; i < 1024; i++) {
+        r->lines[i] = 0;
+        r->waiters[i] = NULL;
+        r->state[i] = ANY;
+    }
 }\r
 \r
 /* maximum outstanding I/O requests issued asynchronously */\r
 /* must be a power of 2.*/\r
-#define MAX_PENDING_IO 1024 //1024\r
+#define MAX_PENDING_IO 1024
 \r
 /* how many threads to concurrently issue I/O to the disk. */\r
-#define IO_POOL_SIZE   10 //10\r
+#define IO_POOL_SIZE   10
 \r
 static struct pending_io_req pending_io_reqs[MAX_PENDING_IO];\r
 static int pending_io_list[MAX_PENDING_IO];\r
@@ -87,276 +87,268 @@ static pthread_cond_t  pending_io_cond = PTHREAD_COND_INITIALIZER;
 \r
 static void init_pending_io(void)\r
 {\r
-       int i;\r
+    int i;
        \r
-       for (i=0; i<MAX_PENDING_IO; i++)\r
-               pending_io_list[i] = i;\r
+    for (i=0; i<MAX_PENDING_IO; i++)
+        pending_io_list[i] = i;
                \r
 } \r
 \r
 void block_read(u64 addr, io_cb_t cb, void *param)\r
 {\r
-       struct pending_io_req *req;\r
-       \r
-       pthread_mutex_lock(&pending_io_lock);\r
-       assert(CAN_PRODUCE_PENDING_IO);\r
-\r
-       req = PENDING_IO_ENT(io_prod++);\r
-       DPRINTF("Produce (R) %lu (%p)\n", io_prod - 1, req);\r
-       req->op = IO_READ;\r
-       req->u.r.addr = addr;\r
-       req->cb = cb;\r
-       req->param = param;\r
-       \r
+    struct pending_io_req *req;
+    
+    pthread_mutex_lock(&pending_io_lock);
+    assert(CAN_PRODUCE_PENDING_IO);
+    
+    req = PENDING_IO_ENT(io_prod++);
+    DPRINTF("Produce (R) %lu (%p)\n", io_prod - 1, req);
+    req->op = IO_READ;
+    req->u.r.addr = addr;
+    req->cb = cb;
+    req->param = param;
+    
     pthread_cond_signal(&pending_io_cond);\r
-       pthread_mutex_unlock(&pending_io_lock); \r
+    pthread_mutex_unlock(&pending_io_lock);    
 }\r
 \r
 \r
 void block_write(u64 addr, char *block, io_cb_t cb, void *param)\r
 {\r
-       struct pending_io_req *req;\r
-       \r
-       pthread_mutex_lock(&pending_io_lock);\r
-       assert(CAN_PRODUCE_PENDING_IO);\r
-\r
-       req = PENDING_IO_ENT(io_prod++);\r
-       DPRINTF("Produce (W) %lu (%p)\n", io_prod - 1, req);\r
-       req->op = IO_WRITE;\r
-       req->u.w.addr  = addr;\r
-       req->u.w.block = block;\r
-       req->cb = cb;\r
-       req->param = param;\r
-       \r
+    struct pending_io_req *req;
+    
+    pthread_mutex_lock(&pending_io_lock);
+    assert(CAN_PRODUCE_PENDING_IO);
+    
+    req = PENDING_IO_ENT(io_prod++);
+    DPRINTF("Produce (W) %lu (%p)\n", io_prod - 1, req);
+    req->op = IO_WRITE;
+    req->u.w.addr  = addr;
+    req->u.w.block = block;
+    req->cb = cb;
+    req->param = param;
+    
     pthread_cond_signal(&pending_io_cond);\r
-       pthread_mutex_unlock(&pending_io_lock); \r
+    pthread_mutex_unlock(&pending_io_lock);    
 }\r
 \r
 \r
 void block_alloc(char *block, io_cb_t cb, void *param)\r
 {\r
-       struct pending_io_req *req;\r
-       \r
-       pthread_mutex_lock(&pending_io_lock);\r
-       assert(CAN_PRODUCE_PENDING_IO);\r
-\r
-       req = PENDING_IO_ENT(io_prod++);\r
-       req->op = IO_ALLOC;\r
-       req->u.a.block = block;\r
-       req->cb = cb;\r
-       req->param = param;\r
+    struct pending_io_req *req;
        \r
+    pthread_mutex_lock(&pending_io_lock);
+    assert(CAN_PRODUCE_PENDING_IO);
+    
+    req = PENDING_IO_ENT(io_prod++);
+    req->op = IO_ALLOC;
+    req->u.a.block = block;
+    req->cb = cb;
+    req->param = param;
+    
     pthread_cond_signal(&pending_io_cond);\r
-       pthread_mutex_unlock(&pending_io_lock); \r
+    pthread_mutex_unlock(&pending_io_lock);    
 }\r
 \r
 void block_rlock(struct radix_lock *r, int row, io_cb_t cb, void *param)\r
 {\r
-       struct io_ret ret;\r
-       pthread_mutex_lock(&r->lock);\r
-       \r
-       if (( r->lines[row] >= 0 ) && (r->state[row] != STOP)) {\r
-               r->lines[row]++;\r
-               r->state[row] = READ;\r
-               DPRINTF("RLOCK  : %3d (row: %d)\n", r->lines[row], row);\r
-               pthread_mutex_unlock(&r->lock);\r
-               ret.type = IO_INT_T;\r
-               ret.u.i = 0;\r
-               cb(ret, param);\r
-       } else {\r
-               struct radix_wait **rwc;\r
-               struct radix_wait *rw = \r
-                       (struct radix_wait *) malloc (sizeof(struct radix_wait));\r
-               DPRINTF("RLOCK  : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row);\r
-               rw->type  = RLOCK;\r
-               rw->param = param;\r
-               rw->cb    = cb;\r
-               rw->next  = NULL;\r
-               /* append to waiters list. */\r
-               rwc = &r->waiters[row];\r
-               while (*rwc != NULL) rwc = &(*rwc)->next;\r
-               *rwc = rw;\r
-               pthread_mutex_unlock(&r->lock);\r
-               return;\r
-       }\r
+    struct io_ret ret;
+    pthread_mutex_lock(&r->lock);
+    
+    if (( r->lines[row] >= 0 ) && (r->state[row] != STOP)) {
+        r->lines[row]++;
+        r->state[row] = READ;
+        DPRINTF("RLOCK  : %3d (row: %d)\n", r->lines[row], row);
+        pthread_mutex_unlock(&r->lock);
+        ret.type = IO_INT_T;
+        ret.u.i = 0;
+        cb(ret, param);
+    } else {
+        struct radix_wait **rwc;
+        struct radix_wait *rw = 
+            (struct radix_wait *) malloc (sizeof(struct radix_wait));
+        DPRINTF("RLOCK  : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row);
+        rw->type  = RLOCK;
+        rw->param = param;
+        rw->cb    = cb;
+        rw->next  = NULL;
+        /* append to waiters list. */
+        rwc = &r->waiters[row];
+        while (*rwc != NULL) rwc = &(*rwc)->next;
+        *rwc = rw;
+        pthread_mutex_unlock(&r->lock);
+        return;
+    }
 }\r
 \r
 \r
 void block_wlock(struct radix_lock *r, int row, io_cb_t cb, void *param)\r
 {\r
-       struct io_ret ret;\r
-       pthread_mutex_lock(&r->lock);\r
-       \r
-       /* the second check here is redundant -- just here for debugging now. */\r
-       if ((r->state[row] == ANY) && ( r->lines[row] == 0 )) {\r
-               r->state[row] = STOP;\r
-               r->lines[row] = -1;\r
-               DPRINTF("WLOCK  : %3d (row: %d)\n", r->lines[row], row);\r
-               pthread_mutex_unlock(&r->lock);\r
-               ret.type = IO_INT_T;\r
-               ret.u.i = 0;\r
-               cb(ret, param);\r
-       } else {\r
-               struct radix_wait **rwc;\r
-               struct radix_wait *rw = \r
-                       (struct radix_wait *) malloc (sizeof(struct radix_wait));\r
-               DPRINTF("WLOCK  : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row);\r
-               rw->type  = WLOCK;\r
-               rw->param = param;\r
-               rw->cb    = cb;\r
-               rw->next  = NULL;\r
-               /* append to waiters list. */\r
-               rwc = &r->waiters[row];\r
-               while (*rwc != NULL) rwc = &(*rwc)->next;\r
-               *rwc = rw;\r
-               pthread_mutex_unlock(&r->lock);\r
-               return;\r
-       }\r
+    struct io_ret ret;
+    pthread_mutex_lock(&r->lock);
+    
+    /* the second check here is redundant -- just here for debugging now. */
+    if ((r->state[row] == ANY) && ( r->lines[row] == 0 )) {
+        r->state[row] = STOP;
+        r->lines[row] = -1;
+        DPRINTF("WLOCK  : %3d (row: %d)\n", r->lines[row], row);
+        pthread_mutex_unlock(&r->lock);
+        ret.type = IO_INT_T;
+        ret.u.i = 0;
+        cb(ret, param);
+    } else {
+        struct radix_wait **rwc;
+        struct radix_wait *rw = 
+            (struct radix_wait *) malloc (sizeof(struct radix_wait));
+        DPRINTF("WLOCK  : %3d (row: %d) -- DEFERRED!\n", r->lines[row], row);
+        rw->type  = WLOCK;
+        rw->param = param;
+        rw->cb    = cb;
+        rw->next  = NULL;
+        /* append to waiters list. */
+        rwc = &r->waiters[row];
+        while (*rwc != NULL) rwc = &(*rwc)->next;
+        *rwc = rw;
+        pthread_mutex_unlock(&r->lock);
+        return;
+    }
        \r
 }\r
 \r
 /* called with radix_lock locked and lock count of zero. */\r
 static void wake_waiters(struct radix_lock *r, int row)\r
 {\r
-       struct pending_io_req *req;\r
-       struct radix_wait *rw;\r
-       \r
-       DPRINTF("prewake\n");\r
-       if (r->lines[row] != 0) return;\r
-       if (r->waiters[row] == NULL) {DPRINTF("nowaiters!\n");return;} \r
-       \r
-       DPRINTF("wake\n");\r
-       if (r->waiters[row]->type == WLOCK) {\r
-               rw = r->waiters[row];\r
-               pthread_mutex_lock(&pending_io_lock);\r
-               assert(CAN_PRODUCE_PENDING_IO);\r
-\r
-               req = PENDING_IO_ENT(io_prod++);\r
-               DPRINTF("Produce (WWAKE) %lu (%p)\n", io_prod - 1, req);\r
-               req->op    = IO_WWAKE;\r
-               req->cb    = rw->cb;\r
-               req->param = rw->param;\r
-               r->lines[row] = -1; /* write lock the row. */\r
-               r->state[row] = STOP;\r
-               r->waiters[row] = rw->next;\r
-               free(rw);\r
-               pthread_mutex_unlock(&pending_io_lock);\r
-       } else /* RLOCK */ {\r
-               while ((r->waiters[row] != NULL) && (r->waiters[row]->type == RLOCK)) {\r
-                       rw = r->waiters[row];\r
-                       pthread_mutex_lock(&pending_io_lock);\r
-                       assert(CAN_PRODUCE_PENDING_IO);\r
-       \r
-                       req = PENDING_IO_ENT(io_prod++);\r
-                       DPRINTF("Produce (RWAKE) %lu (%p)\n", io_prod - 1, req);\r
-                       req->op    = IO_RWAKE;\r
-                       req->cb    = rw->cb;\r
-                       req->param = rw->param;\r
-                       r->lines[row]++; /* read lock the row. */\r
-                       r->state[row] = READ; \r
-                       r->waiters[row] = rw->next;\r
-                       free(rw);\r
-                       pthread_mutex_unlock(&pending_io_lock);\r
-               }\r
-               if (r->waiters[row] != NULL) /* There is a write queued still */\r
-                       r->state[row] = STOP;\r
-       }       \r
-       \r
-       DPRINTF("wakedone\n");\r
-       DPRINTF("prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free);\r
-       pthread_mutex_lock(&pending_io_lock);\r
+    struct pending_io_req *req;
+    struct radix_wait *rw;
+    
+    if (r->lines[row] != 0) return;
+    if (r->waiters[row] == NULL) return; 
+    
+    if (r->waiters[row]->type == WLOCK) {
+
+        rw = r->waiters[row];
+        pthread_mutex_lock(&pending_io_lock);
+        assert(CAN_PRODUCE_PENDING_IO);
+        
+        req = PENDING_IO_ENT(io_prod++);
+        req->op    = IO_WWAKE;
+        req->cb    = rw->cb;
+        req->param = rw->param;
+        r->lines[row] = -1; /* write lock the row. */
+        r->state[row] = STOP;
+        r->waiters[row] = rw->next;
+        free(rw);
+        pthread_mutex_unlock(&pending_io_lock);
+    
+    } else /* RLOCK */ {
+
+        while ((r->waiters[row] != NULL) && (r->waiters[row]->type == RLOCK)) {
+            rw = r->waiters[row];
+            pthread_mutex_lock(&pending_io_lock);
+            assert(CAN_PRODUCE_PENDING_IO);
+            
+            req = PENDING_IO_ENT(io_prod++);
+            req->op    = IO_RWAKE;
+            req->cb    = rw->cb;
+            req->param = rw->param;
+            r->lines[row]++; /* read lock the row. */
+            r->state[row] = READ; 
+            r->waiters[row] = rw->next;
+            free(rw);
+            pthread_mutex_unlock(&pending_io_lock);
+        }
+
+        if (r->waiters[row] != NULL) /* There is a write queued still */
+            r->state[row] = STOP;
+    }  
+    
+    pthread_mutex_lock(&pending_io_lock);
     pthread_cond_signal(&pending_io_cond);\r
-       pthread_mutex_unlock(&pending_io_lock);\r
+    pthread_mutex_unlock(&pending_io_lock);
 }\r
 \r
 void block_runlock(struct radix_lock *r, int row, io_cb_t cb, void *param)\r
 {\r
-       struct io_ret ret;\r
+    struct io_ret ret;
        \r
-       pthread_mutex_lock(&r->lock);\r
-       assert(r->lines[row] > 0); /* try to catch misuse. */\r
-       r->lines[row]--;\r
-       DPRINTF("RUNLOCK: %3d (row: %d)\n", r->lines[row], row);\r
-       if (r->lines[row] == 0) {\r
-               r->state[row] = ANY;\r
-               wake_waiters(r, row);\r
-       }\r
-       pthread_mutex_unlock(&r->lock);\r
-       cb(ret, param);\r
+    pthread_mutex_lock(&r->lock);
+    assert(r->lines[row] > 0); /* try to catch misuse. */
+    r->lines[row]--;
+    if (r->lines[row] == 0) {
+        r->state[row] = ANY;
+        wake_waiters(r, row);
+    }
+    pthread_mutex_unlock(&r->lock);
+    cb(ret, param);
 }\r
 \r
 void block_wunlock(struct radix_lock *r, int row, io_cb_t cb, void *param)\r
 {\r
-       struct io_ret ret;\r
-       \r
-       pthread_mutex_lock(&r->lock);\r
-       assert(r->lines[row] == -1); /* try to catch misuse. */\r
-       r->lines[row] = 0;\r
-       r->state[row] = ANY;\r
-       DPRINTF("WUNLOCK: %3d (row: %d)\n", r->lines[row], row);\r
-       wake_waiters(r, row);\r
-       pthread_mutex_unlock(&r->lock);\r
-       cb(ret, param);\r
+    struct io_ret ret;
+    
+    pthread_mutex_lock(&r->lock);
+    assert(r->lines[row] == -1); /* try to catch misuse. */
+    r->lines[row] = 0;
+    r->state[row] = ANY;
+    wake_waiters(r, row);
+    pthread_mutex_unlock(&r->lock);
+    cb(ret, param);
 }\r
 \r
 /* consumer calls */\r
 static void do_next_io_req(struct pending_io_req *req)\r
 {\r
-       struct io_ret          ret;\r
-       void  *param;\r
-       \r
-       switch (req->op) {\r
-       case IO_READ:\r
-               ret.type = IO_BLOCK_T;\r
-               ret.u.b  = readblock(req->u.r.addr);\r
-               break;\r
-       case IO_WRITE:\r
-               ret.type = IO_INT_T;\r
-               ret.u.i  = writeblock(req->u.w.addr, req->u.w.block);\r
-               DPRINTF("wrote %d at %Lu\n", *(int *)(req->u.w.block), req->u.w.addr);\r
-               break;\r
-       case IO_ALLOC:\r
-               ret.type = IO_ADDR_T;\r
-               ret.u.a  = allocblock(req->u.a.block);\r
-               break;\r
-       case IO_RWAKE:\r
-               DPRINTF("WAKE DEFERRED RLOCK!\n");\r
-               ret.type = IO_INT_T;\r
-               ret.u.i  = 0;\r
-               break;\r
-       case IO_WWAKE:\r
-               DPRINTF("WAKE DEFERRED WLOCK!\n");\r
-               ret.type = IO_INT_T;\r
-               ret.u.i  = 0;\r
-               break;\r
-       default:\r
-               DPRINTF("Unknown IO operation on pending list!\n");\r
-               return;\r
-       }\r
+    struct io_ret          ret;
+    void  *param;
+    
+    switch (req->op) {
+    case IO_READ:
+        ret.type = IO_BLOCK_T;
+        ret.u.b  = readblock(req->u.r.addr);
+        break;
+    case IO_WRITE:
+        ret.type = IO_INT_T;
+        ret.u.i  = writeblock(req->u.w.addr, req->u.w.block);
+        DPRINTF("wrote %d at %Lu\n", *(int *)(req->u.w.block), req->u.w.addr);
+        break;
+    case IO_ALLOC:
+        ret.type = IO_ADDR_T;
+        ret.u.a  = allocblock(req->u.a.block);
+        break;
+    case IO_RWAKE:
+        DPRINTF("WAKE DEFERRED RLOCK!\n");
+        ret.type = IO_INT_T;
+        ret.u.i  = 0;
+        break;
+    case IO_WWAKE:
+        DPRINTF("WAKE DEFERRED WLOCK!\n");
+        ret.type = IO_INT_T;
+        ret.u.i  = 0;
+        break;
+    default:
+        DPRINTF("Unknown IO operation on pending list!\n");
+        return;
+    }
+    
+    param = req->param;
+    pthread_mutex_lock(&pending_io_lock);
+    pending_io_list[PENDING_IO_MASK(io_free++)] = PENDING_IO_IDX(req);
+    pthread_mutex_unlock(&pending_io_lock);
        \r
-       param = req->param;\r
-       DPRINTF("freeing idx %d to slot %lu.\n", PENDING_IO_IDX(req), PENDING_IO_MASK(io_free));\r
-       pthread_mutex_lock(&pending_io_lock);\r
-       pending_io_list[PENDING_IO_MASK(io_free++)] = PENDING_IO_IDX(req);\r
-       DPRINTF("       : prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free);\r
-       pthread_mutex_unlock(&pending_io_lock);\r
-       \r
-       assert(req->cb != NULL);\r
-       req->cb(ret, param);\r
-               \r
+    assert(req->cb != NULL);
+    req->cb(ret, param);
+    
 }\r
 \r
 void *io_thread(void *param) \r
 {\r
-       int tid;\r
-       struct pending_io_req *req;\r
-       \r
-       /* Set this thread's tid. */\r
+    int tid;
+    struct pending_io_req *req;
+    
+    /* Set this thread's tid. */
     tid = *(int *)param;\r
     free(param);\r
     \r
-    DPRINTF("IOT %2d started.\n", tid);\r
-    \r
 start:\r
     pthread_mutex_lock(&pending_io_lock);\r
     while (io_prod == io_cons) {\r
@@ -369,15 +361,12 @@ start:
         goto start;\r
     }\r
     \r
-       req = PENDING_IO_ENT(io_cons++);\r
-       DPRINTF("IOT %2d has req %04d(%p).\n", tid, PENDING_IO_IDX(req), req);\r
-       DPRINTF("       : prod: %lu cons: %lu free: %lu\n", io_prod, io_cons, io_free);\r
-       pthread_mutex_unlock(&pending_io_lock);\r
-       \r
+    req = PENDING_IO_ENT(io_cons++);
+    pthread_mutex_unlock(&pending_io_lock);
        \r
     do_next_io_req(req);\r
     \r
-       goto start;\r
+    goto start;
        \r
 }\r
 \r
@@ -385,9 +374,9 @@ static pthread_t io_pool[IO_POOL_SIZE];
 void start_io_threads(void)\r
 \r
 {      \r
-       int i, tid=0;\r
-       \r
-        for (i=0; i < IO_POOL_SIZE; i++) {\r
+    int i, tid=0;
+    
+    for (i=0; i < IO_POOL_SIZE; i++) {
         int ret, *t;\r
         t = (int *)malloc(sizeof(int));\r
         *t = tid++;\r
@@ -399,6 +388,6 @@ void start_io_threads(void)
 \r
 void init_block_async(void)\r
 {\r
-       init_pending_io();\r
-       start_io_threads();\r
+    init_pending_io();
+    start_io_threads();
 }\r
index b19d464a52dfc9dbf34e809711312062411a578b..022eea5da1be2b5797c39153ec583caa74aef9e0 100755 (executable)
 \r
 struct io_ret\r
 {\r
-       enum {IO_ADDR_T, IO_BLOCK_T, IO_INT_T} type;\r
-       union {\r
-               u64   a;\r
-               char *b;\r
-               int   i;\r
-       } u;\r
+    enum {IO_ADDR_T, IO_BLOCK_T, IO_INT_T} type;
+    union {
+        u64   a;
+        char *b;
+        int   i;
+    } u;
 };\r
 \r
 typedef void (*io_cb_t)(struct io_ret r, void *param);\r
 \r
 /* per-vdi lock structures to make sure requests run in a safe order. */\r
 struct radix_wait {\r
-       enum {RLOCK, WLOCK} type;\r
-       io_cb_t  cb;\r
-       void    *param;\r
-       struct radix_wait *next;\r
+    enum {RLOCK, WLOCK} type;
+    io_cb_t  cb;
+    void    *param;
+    struct radix_wait *next;
 };\r
 \r
 struct radix_lock {\r
-       pthread_mutex_t lock;\r
-       int                    lines[1024];\r
-       struct radix_wait     *waiters[1024];\r
-       enum {ANY, READ, STOP} state[1024];\r
+    pthread_mutex_t lock;
+    int                    lines[1024];
+    struct radix_wait     *waiters[1024];
+    enum {ANY, READ, STOP} state[1024];
 };\r
 void radix_lock_init(struct radix_lock *r);\r
 \r
@@ -49,20 +49,20 @@ void init_block_async(void);
 \r
 static inline u64 IO_ADDR(struct io_ret r)\r
 {\r
-       assert(r.type == IO_ADDR_T);\r
-       return r.u.a;\r
+    assert(r.type == IO_ADDR_T);
+    return r.u.a;
 }\r
 \r
 static inline char *IO_BLOCK(struct io_ret r)\r
 {\r
-       assert(r.type == IO_BLOCK_T);\r
-       return r.u.b;\r
+    assert(r.type == IO_BLOCK_T);
+    return r.u.b;
 }\r
 \r
 static inline int IO_INT(struct io_ret r)\r
 {\r
-       assert(r.type == IO_INT_T);\r
-       return r.u.i;\r
+    assert(r.type == IO_INT_T);
+    return r.u.i;
 }\r
 \r
 \r
index 50d282cf0abfa074501b8bbb635bd3da0bbad146..3f59834f12ba866a4940af8def0cf11a6229f05e 100644 (file)
@@ -328,33 +328,33 @@ typedef struct {
 pending_t pending_list[MAX_REQUESTS];
 
 struct cb_param {
-       pending_t *pent;
-       int       segment;
-       u64       sector; 
-       u64       vblock; /* for debug printing -- can be removed. */
+    pending_t *pent;
+    int       segment;
+    u64       sector; 
+    u64       vblock; /* for debug printing -- can be removed. */
 };
 
 static void read_cb(struct io_ret r, void *in_param)
 {
-       struct cb_param *param = (struct cb_param *)in_param;
-       pending_t *p = param->pent;
-       int segment = param->segment;
-       blkif_request_t *req = p->req;
+    struct cb_param *param = (struct cb_param *)in_param;
+    pending_t *p = param->pent;
+    int segment = param->segment;
+    blkif_request_t *req = p->req;
     unsigned long size, offset, start;
-       char *dpage, *spage;
+    char *dpage, *spage;
        
-       spage  = IO_BLOCK(r);
-       if (spage == NULL) { p->error++; goto finish; }
-       dpage  = (char *)MMAP_VADDR(ID_TO_IDX(req->id), segment);
+    spage  = IO_BLOCK(r);
+    if (spage == NULL) { p->error++; goto finish; }
+    dpage  = (char *)MMAP_VADDR(ID_TO_IDX(req->id), segment);
     
     /* Calculate read size and offset within the read block. */
 
     offset = (param->sector << SECTOR_SHIFT) % BLOCK_SIZE;
     size = ( blkif_last_sect (req->frame_and_sects[segment]) -
              blkif_first_sect(req->frame_and_sects[segment]) + 1
-           ) << SECTOR_SHIFT;
+        ) << SECTOR_SHIFT;
     start = blkif_first_sect(req->frame_and_sects[segment]) 
-            << SECTOR_SHIFT;
+        << SECTOR_SHIFT;
 
     DPRINTF("ParallaxRead: sect: %lld (%ld,%ld),  "
             "vblock %llx, "
@@ -371,23 +371,23 @@ static void read_cb(struct io_ret r, void *in_param)
     pthread_mutex_lock(&p->mutex);
     p->count--;
     
-       if (p->count == 0) {
+    if (p->count == 0) {
        blkif_response_t *rsp;
        
         rsp = (blkif_response_t *)req;
         rsp->id = req->id;
         rsp->operation = BLKIF_OP_READ;
        if (p->error == 0) {
-               rsp->status = BLKIF_RSP_OKAY;
+            rsp->status = BLKIF_RSP_OKAY;
        } else {
-               rsp->status = BLKIF_RSP_ERROR;
+            rsp->status = BLKIF_RSP_ERROR;
        }
         blktap_inject_response(rsp);       
     }
     
     pthread_mutex_unlock(&p->mutex);
        
-       free(param); /* TODO: replace with cached alloc/dealloc */
+    free(param); /* TODO: replace with cached alloc/dealloc */
 }      
 
 int parallax_read(blkif_request_t *req, blkif_t *blkif)
@@ -414,21 +414,20 @@ int parallax_read(blkif_request_t *req, blkif_t *blkif)
         pthread_t tid;
         int ret;
         struct cb_param *p;
-
-           /* Round the requested segment to a block address. */
-           sector  = req->sector_number + (8*i);
-           vblock = (sector << SECTOR_SHIFT) >> BLOCK_SHIFT;
-
-               /* TODO: Replace this call to malloc with a cached allocation */
-               p = (struct cb_param *)malloc(sizeof(struct cb_param));
-               p->pent = pent;
-               p->sector = sector; 
-               p->segment = i;     
-               p->vblock = vblock; /* dbg */
-               
-           /* Get that block from the store. */
-           async_read(vdi, vblock, read_cb, (void *)p);
-
+        
+        /* Round the requested segment to a block address. */
+        sector  = req->sector_number + (8*i);
+        vblock = (sector << SECTOR_SHIFT) >> BLOCK_SHIFT;
+        
+        /* TODO: Replace this call to malloc with a cached allocation */
+        p = (struct cb_param *)malloc(sizeof(struct cb_param));
+        p->pent = pent;
+        p->sector = sector; 
+        p->segment = i;     
+        p->vblock = vblock; /* dbg */
+        
+        /* Get that block from the store. */
+        async_read(vdi, vblock, read_cb, (void *)p);    
     }
     
     return BLKTAP_STOLEN;
@@ -444,33 +443,33 @@ err:
 
 static void write_cb(struct io_ret r, void *in_param)
 {
-       struct cb_param *param = (struct cb_param *)in_param;
-       pending_t *p = param->pent;
-       blkif_request_t *req = p->req;
-
-       /* catch errors from the block code. */
-       if (IO_INT(r) < 0) p->error++;
-       
+    struct cb_param *param = (struct cb_param *)in_param;
+    pending_t *p = param->pent;
+    blkif_request_t *req = p->req;
+    
+    /* catch errors from the block code. */
+    if (IO_INT(r) < 0) p->error++;
+    
     pthread_mutex_lock(&p->mutex);
     p->count--;
     
-       if (p->count == 0) {
+    if (p->count == 0) {
        blkif_response_t *rsp;
        
         rsp = (blkif_response_t *)req;
         rsp->id = req->id;
         rsp->operation = BLKIF_OP_WRITE;
        if (p->error == 0) {
-               rsp->status = BLKIF_RSP_OKAY;
+            rsp->status = BLKIF_RSP_OKAY;
        } else {
-               rsp->status = BLKIF_RSP_ERROR;
+            rsp->status = BLKIF_RSP_ERROR;
        }
         blktap_inject_response(rsp);       
     }
     
     pthread_mutex_unlock(&p->mutex);
        
-       free(param); /* TODO: replace with cached alloc/dealloc */
+    free(param); /* TODO: replace with cached alloc/dealloc */
 }
 
 int parallax_write(blkif_request_t *req, blkif_t *blkif)
@@ -496,7 +495,7 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif)
     
     for (i = 0; i < req->nr_segments; i++) {
         struct cb_param *p;
-            
+        
         spage  = (char *)MMAP_VADDR(ID_TO_IDX(req->id), i);
         
         /* Round the requested segment to a block address. */
@@ -509,7 +508,7 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif)
         offset = (sector << SECTOR_SHIFT) % BLOCK_SIZE;
         size = ( blkif_last_sect (req->frame_and_sects[i]) -
                  blkif_first_sect(req->frame_and_sects[i]) + 1
-               ) << SECTOR_SHIFT;
+            ) << SECTOR_SHIFT;
         start = blkif_first_sect(req->frame_and_sects[i]) << SECTOR_SHIFT;
 
         DPRINTF("ParallaxWrite: sect: %lld (%ld,%ld),  "
@@ -527,15 +526,15 @@ int parallax_write(blkif_request_t *req, blkif_t *blkif)
             goto err;
         }
         
-               /* TODO: Replace this call to malloc with a cached allocation */
-               p = (struct cb_param *)malloc(sizeof(struct cb_param));
-               p->pent = pent;
-               p->sector = sector; 
-               p->segment = i;     
-               p->vblock = vblock; /* dbg */
-               
+        /* TODO: Replace this call to malloc with a cached allocation */
+        p = (struct cb_param *)malloc(sizeof(struct cb_param));
+        p->pent = pent;
+        p->sector = sector; 
+        p->segment = i;     
+        p->vblock = vblock; /* dbg */
+        
         /* Issue the write to the store. */
-           async_write(vdi, vblock, spage, write_cb, (void *)p);
+        async_write(vdi, vblock, spage, write_cb, (void *)p);
     }
 
     return BLKTAP_STOLEN;
index bb2d07b60ab39c869a2c1362d2c55100e0e7c22c..f68ae76db4ed1606b4f52ecfd9e32d7c4cd3a35c 100755 (executable)
@@ -1,6 +1,6 @@
-/* read.c\r
+/* requests-async.c
  *\r
- * asynchronous read experiment for parallax.\r
+ * asynchronous request dispatcher for radix access in parallax.
  */\r
 \r
 #include <stdio.h>\r
@@ -17,9 +17,6 @@
 #define L3_IDX(_a) (((_a) & 0x00000000000001ffULL))\r
 \r
 \r
-\r
-//#define STANDALONE\r
-\r
 #if 0\r
 #define DPRINTF(_f, _a...) printf ( _f , ## _a )\r
 #else\r
@@ -45,10 +42,10 @@ struct io_req {
 \r
 void clear_w_bits(radix_tree_node node) \r
 {\r
-       int i;\r
-       for (i=0; i<RADIX_TREE_MAP_ENTRIES; i++)\r
-               node[i] = node[i] & ONEMASK;\r
-       return;\r
+    int i;
+    for (i=0; i<RADIX_TREE_MAP_ENTRIES; i++)
+        node[i] = node[i] & ONEMASK;
+    return;
 }\r
 \r
 enum states {\r
@@ -89,15 +86,15 @@ enum states {
     ALLOC_L3_L2f,\r
     WRITE_L2_L3f,\r
 \r
-       /* L1 Zero Path */\r
+    /* L1 Zero Path */
     ALLOC_DATA_L1z,\r
     ALLOC_L3_L1z,\r
     ALLOC_L2_L1z,\r
     WRITE_L1_L1z,\r
 \r
-       /* L1 Fault Path */\r
-       READ_L2_L1f,\r
-       READ_L3_L1f,\r
+    /* L1 Fault Path */
+    READ_L2_L1f,
+    READ_L3_L1f,
     ALLOC_DATA_L1f,\r
     ALLOC_L3_L1f,\r
     ALLOC_L2_L1f,\r
@@ -123,9 +120,9 @@ int async_read(vdi_t *vdi, u64 vaddr, io_cb_t cb, void *param)
     DPRINTF("async_read\n");\r
 \r
     req = (struct io_req *)malloc(sizeof (struct io_req));\r
-       req->radix[0] = req->radix[1] = req->radix[2] = NULL;\r
+    req->radix[0] = req->radix[1] = req->radix[2] = NULL;
 \r
-       if (req == NULL) {perror("req was NULL in async_read"); return(-1); }\r
+    if (req == NULL) {perror("req was NULL in async_read"); return(-1); }
        \r
     req->op    = IO_OP_READ;\r
     req->root  = vdi->radix_root;\r
@@ -135,7 +132,7 @@ int async_read(vdi_t *vdi, u64 vaddr, io_cb_t cb, void *param)
     req->param = param;\r
     req->state = READ_LOCKED;\r
 \r
-       block_rlock(req->lock, L1_IDX(vaddr), read_cb, req);\r
+    block_rlock(req->lock, L1_IDX(vaddr), read_cb, req);
        \r
     return 0;\r
 }\r
@@ -148,10 +145,9 @@ int   async_write(vdi_t *vdi, u64 vaddr, char *block,
 \r
 \r
     req = (struct io_req *)malloc(sizeof (struct io_req));\r
-       req->radix[0] = req->radix[1] = req->radix[2] = NULL;\r
-    //DPRINTF("async_write\n");\r
+    req->radix[0] = req->radix[1] = req->radix[2] = NULL;
     \r
-       if (req == NULL) {perror("req was NULL in async_write"); return(-1); }\r
+    if (req == NULL) {perror("req was NULL in async_write"); return(-1); }
 \r
     req->op    = IO_OP_WRITE;\r
     req->root  = vdi->radix_root;\r
@@ -163,10 +159,10 @@ int   async_write(vdi_t *vdi, u64 vaddr, char *block,
     req->radix_addr[L1] = getid(req->root); /* for consistency */\r
     req->state = WRITE_LOCKED;\r
 \r
-       block_wlock(req->lock, L1_IDX(vaddr), write_cb, req);\r
+    block_wlock(req->lock, L1_IDX(vaddr), write_cb, req);
 \r
 \r
-       return 0;\r
+    return 0;
 }\r
 \r
 void read_cb(struct io_ret ret, void *param)\r
@@ -197,11 +193,11 @@ void read_cb(struct io_ret ret, void *param)
         idx  = getid( node[L1_IDX(req->vaddr)] );\r
         free(block);\r
         if ( idx == ZERO ) {\r
-               req->state = RETURN_ZERO;\r
-               block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);\r
+            req->state = RETURN_ZERO;
+            block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);
         } else {\r
-               req->state = READ_L2;\r
-               block_read(idx, read_cb, req);\r
+            req->state = READ_L2;
+            block_read(idx, read_cb, req);
         }\r
         break;\r
 \r
@@ -214,11 +210,11 @@ void read_cb(struct io_ret ret, void *param)
         idx  = getid( node[L2_IDX(req->vaddr)] );\r
         free(block);\r
         if ( idx == ZERO ) {\r
-               req->state = RETURN_ZERO;\r
-               block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);\r
+            req->state = RETURN_ZERO;
+            block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);
         } else {\r
-               req->state = READ_L3;\r
-               block_read(idx, read_cb, req);\r
+            req->state = READ_L3;
+            block_read(idx, read_cb, req);
         }\r
         break;\r
 \r
@@ -231,11 +227,11 @@ void read_cb(struct io_ret ret, void *param)
         idx  = getid( node[L3_IDX(req->vaddr)] );\r
         free(block);\r
         if ( idx == ZERO )  {\r
-               req->state = RETURN_ZERO;\r
-               block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);\r
+            req->state = RETURN_ZERO;
+            block_runlock(req->lock, L1_IDX(req->vaddr), read_cb, req);
         } else {\r
-               req->state = READ_DATA;\r
-               block_read(idx, read_cb, req);\r
+            req->state = READ_DATA;
+            block_read(idx, read_cb, req);
         }\r
         break;\r
 \r
@@ -249,9 +245,9 @@ void read_cb(struct io_ret ret, void *param)
         break;\r
         \r
     case READ_UNLOCKED:\r
-       {\r
-               struct io_ret r;\r
-               io_cb_t cb;\r
+    {
+        struct io_ret r;
+        io_cb_t cb;
         DPRINTF("READ_UNLOCKED\n");\r
         req_param = req->param;\r
         r         = req->retval;\r
@@ -262,18 +258,18 @@ void read_cb(struct io_ret ret, void *param)
     }\r
     \r
     case RETURN_ZERO:\r
-       {\r
-               struct io_ret r;\r
-               io_cb_t cb;\r
-           DPRINTF("RETURN_ZERO\n");\r
-           req_param = req->param;\r
+    {
+        struct io_ret r;
+        io_cb_t cb;
+        DPRINTF("RETURN_ZERO\n");
+        req_param = req->param;
         cb        = req->cb;\r
-           free(req);\r
+        free(req);
         r.type = IO_BLOCK_T;\r
         r.u.b = newblock();\r
-           cb(r, req_param);\r
-           break;\r
-       }\r
+        cb(r, req_param);
+        break;
+    }
         \r
     default:\r
        DPRINTF("*** Write: Bad state! (%d) ***\n", req->state);\r
@@ -283,16 +279,16 @@ void read_cb(struct io_ret ret, void *param)
     return;\r
 \r
  fail:\r
-       {\r
-               struct io_ret r;\r
-               io_cb_t cb;\r
-               DPRINTF("asyn_read had a read error.\n");\r
+    {
+        struct io_ret r;
+        io_cb_t cb;
+        DPRINTF("asyn_read had a read error.\n");
         req_param = req->param;\r
         r         = ret;\r
         cb        = req->cb;\r
         free(req);\r
         cb(r, req_param);\r
-       }\r
+    }
 \r
 \r
 }\r
@@ -304,11 +300,10 @@ void write_cb(struct io_ret r, void *param)
     u64 a, addr;\r
     void *req_param;\r
 \r
-    //DPRINTF("write_cb\n");\r
     switch(req->state) {\r
        \r
     case WRITE_LOCKED:\r
-    \r
+        
         DPRINTF("WRITE_LOCKED (%llu)\n", L1_IDX(req->vaddr));\r
        req->state = READ_L1;\r
        block_read(getid(req->root), write_cb, req); \r
@@ -326,9 +321,9 @@ void write_cb(struct io_ret r, void *param)
         req->radix[L1] = node;\r
 \r
         if ( addr == ZERO ) {\r
-               /* L1 empty subtree: */\r
-               req->state = ALLOC_DATA_L1z;\r
-               block_alloc( req->block, write_cb, req );\r
+            /* L1 empty subtree: */
+            req->state = ALLOC_DATA_L1z;
+            block_alloc( req->block, write_cb, req );
         } else if ( !iswritable(a) ) {\r
             /* L1 fault: */\r
             req->state = READ_L2_L1f;\r
@@ -351,7 +346,7 @@ void write_cb(struct io_ret r, void *param)
         req->radix[L2] = node;\r
 \r
         if ( addr == ZERO ) {\r
-               /* L2 empty subtree: */\r
+            /* L2 empty subtree: */
             req->state = ALLOC_DATA_L2z;\r
             block_alloc( req->block, write_cb, req );\r
         } else if ( !iswritable(a) ) {\r
@@ -447,7 +442,7 @@ void write_cb(struct io_ret r, void *param)
         addr = getid(a);\r
 \r
         req->radix[L3] = node;\r
-               req->state = ALLOC_DATA_L2f;\r
+        req->state = ALLOC_DATA_L2f;
         block_alloc( req->block, write_cb, req );\r
         break;\r
                 \r
@@ -520,14 +515,14 @@ void write_cb(struct io_ret r, void *param)
         req->radix[L2] = node;\r
         \r
         if (addr == ZERO) {\r
-               /* nothing below L2, create an empty L3 and alloc data. */\r
-               /* (So skip READ_L3_L1f.) */\r
-               req->radix[L3] = newblock();\r
-               req->state = ALLOC_DATA_L1f;\r
-               block_alloc( req->block, write_cb, req );\r
+            /* nothing below L2, create an empty L3 and alloc data. */
+            /* (So skip READ_L3_L1f.) */
+            req->radix[L3] = newblock();
+            req->state = ALLOC_DATA_L1f;
+            block_alloc( req->block, write_cb, req );
         } else {\r
-                       req->state = READ_L3_L1f;\r
-                       block_read( addr, write_cb, req );\r
+            req->state = READ_L3_L1f;
+            block_read( addr, write_cb, req );
         }\r
         break;\r
         \r
@@ -541,7 +536,7 @@ void write_cb(struct io_ret r, void *param)
         addr = getid(a);\r
 \r
         req->radix[L3] = node;\r
-               req->state = ALLOC_DATA_L1f;\r
+        req->state = ALLOC_DATA_L1f;
         block_alloc( req->block, write_cb, req );\r
         break;\r
                 \r
@@ -587,7 +582,7 @@ void write_cb(struct io_ret r, void *param)
         DPRINTF("DONE\n");\r
         /* free any saved node vals. */\r
         for (i=0; i<3; i++)\r
-               if (req->radix[i] != 0) free(req->radix[i]);\r
+            if (req->radix[i] != 0) free(req->radix[i]);
         req->retval = r;\r
         req->state = WRITE_UNLOCKED;\r
         block_wunlock(req->lock, L1_IDX(req->vaddr), write_cb, req);\r
@@ -601,7 +596,7 @@ void write_cb(struct io_ret r, void *param)
         req_param = req->param;\r
         r         = req->retval;\r
         cb        = req->cb;\r
-           free(req);\r
+        free(req);
         cb(r, req_param);\r
         break;\r
     }\r
@@ -614,16 +609,16 @@ void write_cb(struct io_ret r, void *param)
     return;\r
     \r
  fail:\r
-       {\r
-               struct io_ret r;\r
-               io_cb_t cb;\r
-               DPRINTF("asyn_write had a read error mid-way.\n");\r
+    {
+        struct io_ret r;
+        io_cb_t cb;
+        DPRINTF("asyn_write had a read error mid-way.\n");
         req_param = req->param;\r
         cb        = req->cb;\r
         r.type = IO_INT_T;\r
         r.u.i  = -1;\r
         free(req);\r
         cb(r, req_param);\r
-       }\r
+    }
 }\r
 \r